Parallel computing with Python

multiprocessing

The built in module multiprocessing provides functionality to create processes which runs given tasks.

http://docs.python.org/2/library/multiprocessing.html

All strategies for paralleliztion has a rathe large overhead compared to lower level languages such as C or FORTRAN.

The way multiprocessing runs code in parallel is by launching subprocesses with a seperate interpretor for for each process. This means that in order to gain speed the computation we want to perform should be relatively substantial.

(In case you are familiar with threads: It should be noted that Python has a threading module for working with threads, however, all threads will be run on a single CPU.)

Byt using multiprocessing we can utilize the machines we are running code on more efficiently


In [1]:
import multiprocessing

In [2]:
multiprocessing.cpu_count()


Out[2]:
8

Before talking about some more advanced featuers, let's describe the most typical use pattern of multiprocessing.

Note: multiprocessing can be used in the IPython Notebook, but there are sometimes issues with printing from subprocesses. To make things clearer and avoid complications we shall run external scripts in stead.

Process

Processes share nothing

To spawn a process, initiate it with a target function and call the .start() method.

This method will arrange things so that given code will be run in a seperate process from the parent process. To get the parent process to wait until a process has finished before moving on one need to call the .join() method.


In [3]:
import os
os.getpid()


Out[3]:
4608

In [4]:
%%file mp.py
from multiprocessing import Process
import os

def worker():
    print("Worker process {}".format(os.getpid()))

if __name__ == "__main__":
    proc1 = Process(target=worker)
    proc1.start()
    proc2 = Process(target=worker)
    proc2.start()


Writing mp.py

In [4]:
import subprocess
def run(script_file):
    print subprocess.Popen('python %s' % script_file, stdout=subprocess.PIPE).communicate()[0]

In [5]:
run('mp.py')


Worker process 4364, argument was 11
Worker process 6844, argument was 10

To get the target function to actually work on some input, you need to provide the arguments in the constructur of the Process.


In [23]:
%%file mp.py
from multiprocessing import Process
import os

def worker(arg):
    print("Worker process {}, argument was {}".format(os.getpid(), arg))

if __name__ == "__main__":
    proc1 = Process(target=worker, args=(10,))
    proc1.start()
    proc2 = Process(target=worker, args=(11,))
    proc2.start()


Overwriting mp.py

In [24]:
run('mp.py')


Worker process 7532, argument was 10
Worker process 864, argument was 11

Processes communicate over interprocess communication channel

  • Queue
  • Pipe

Pipe

Gives a pair of connection objects which are connected by a pipe. Uses send and recv methods on objects to comminucate between processes.

Queue

Gives a thread and process safe queue shared between processes. Can contain any pickle-able object.


In [25]:
%%file mp2.py
from multiprocessing import Process, Queue
import os

def worker(tasks, results):
    t = tasks.get()
    result = t * 2
    results.put([os.getpid(), t, "->", result])

if __name__ == "__main__":
    n = 20
    my_tasks = Queue()
    my_results = Queue()
    
    workers = [Process(target=worker, args=(my_tasks, my_results)) for i in range(n)]
    
    for proc in workers:
        proc.start()
    
    for i in range(n):
        my_tasks.put(i)
    
    for i in range(n):
        result = my_results.get()
        print(result)


Writing mp2.py

In [26]:
run('mp2.py')


[4952, 0, '->', 0]
[3380, 1, '->', 2]
[7960, 2, '->', 4]
[7844, 3, '->', 6]
[6896, 4, '->', 8]
[7480, 6, '->', 12]
[6696, 5, '->', 10]
[1752, 7, '->', 14]
[6148, 10, '->', 20]
[7464, 9, '->', 18]
[6128, 11, '->', 22]
[8124, 8, '->', 16]
[4208, 12, '->', 24]
[6400, 13, '->', 26]
[3472, 14, '->', 28]
[6828, 15, '->', 30]
[3572, 16, '->', 32]
[8188, 17, '->', 34]
[4200, 18, '->', 36]
[6836, 19, '->', 38]

Because the processes are executed in parallel we can never know the order of results being put in the Queue.


In [27]:
from multiprocessing import Queue

In [28]:
q = Queue()

In [29]:
q.get?

Manager

A special Process which holds Python objects so that other processes can manipulate them. When a managed object is manipulated somewhere, the manager will make sure the change is propagated to any other processes using the managed object.


In [42]:
%%file mp3.py
from multiprocessing import Process, Manager
import os

def worker(l):
    p = os.getpid()
    #l[int(str(p)[-1:])] = p
    print p
    l[int(str(p)[-1:])]+=1
    
    


if __name__ == "__main__":
    n = 10
    manager = Manager()
    l = manager.list()
    l.extend([0] * n)
    processes = [Process(target=worker, args=(l,)) for i in range(20)]
    
    for proc in processes:
        proc.start()
    
    for proc in processes:
        proc.join()
    
    print(l)
    print sum(l)


Overwriting mp3.py

In [43]:
run('mp3.py')


6444
7176
3312
6776
5712
4452
7960
4952
7772
3640
2436
1924
4216
6872
4228
7468
7048
6276
8144
6820
[3, 0, 6, 0, 3, 0, 5, 0, 3, 0]
20

Pool

The Pool class distributes work between workers and collects the results as a list. It is extremely handy for quickly implementing some simple parallelization.


In [44]:
%%file mp.py
import multiprocessing
import os


def task(args):
    print "Running process", os.getpid(), "with args", args
    return os.getpid(), args


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = pool.map(task, [1,2,3,4]*3)
    print(result)


Overwriting mp.py

In [45]:
run('mp.py')


Running process 6648 with args 1
Running process 6648 with args 2
Running process 6648 with args 3
Running process 6648 with args 4
Running process 6648 with args 1
Running process 6648 with args 2
Running process 6648 with args 3
Running process 6648 with args 4
Running process 6648 with args 1
Running process 6648 with args 2
Running process 6648 with args 3
Running process 6648 with args 4
[(6648, 1), (6648, 2), (6648, 3), (6648, 4), (6648, 1), (6648, 2), (6648, 3), (6648, 4), (6648, 1), (6648, 2), (6648, 3), (6648, 4)]

The method .map() works like the built in function map(), but will send data from the iterable to different processes. By default it will send one element at a time, but this can be changed with the chunksize parameter.

A similar method called .map_async() usually performs better in parallel, but in that case one has to fetch the results using a .get() method of the returned value of .map_async() (which is an instance of the class AsyncResult).


In [50]:
%%file mp.py
import multiprocessing
import os

def task(args):
    print "Running process", os.getpid(), "with args", args
    return os.getpid(), args


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = pool.map_async(task, [1,2,3,4])
    print(result.get())


Overwriting mp.py

In [51]:
run('mp.py')


Running process 6672 with args 1
Running process 6672 with args 2
Running process 6672 with args 3
Running process 6672 with args 4
[(6672, 1), (6672, 2), (6672, 3), (6672, 4)]